package com.jokerku.mini.schedule.service;

import com.jokerku.mini.schedule.common.Constants;
import com.jokerku.mini.schedule.event.ScheduleEventListener;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Author: guzq
 * @CreateTime: 2023/05/24 17:43
 * @Description: ZK操作类
 * @Version: 1.0
 */
public class ZkCuratorServer {

    private static final Logger logger = LoggerFactory.getLogger(ZkCuratorServer.class);

    public static CuratorFramework getClient(String connectString) {
        if (Constants.Global.client != null) {
            return Constants.Global.client;
        }
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(3000)
                .connectString(connectString)
                .build();
        //添加重连监听
        client.getConnectionStateListenable().addListener((curatorFramework, connectionState) -> {
            switch (connectionState) {
                //Sent for the first successful connection to the server
                case CONNECTED:
                    logger.info("mini schedule init server connected {}", connectString);
                    break;
                //A suspended, lost, or read-only connection has been re-established
                case RECONNECTED:
                    logger.info("mini schedule init server reconnected {}", connectString);
                    break;
                default:
                    break;
            }
        });
        client.start();
        Constants.Global.client = client;

        return client;
    }

    /**
     * 递归删除节点
     *
     * @param client zk 客户端
     * @param path   节点
     * @throws Exception
     */
    public static void deletingChildrenIfNeeded(CuratorFramework client, String path) throws Exception {
        if (null == client.checkExists().forPath(path)) return;
        // 递归删除节点
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

    public static void delete(CuratorFramework client, String path) throws Exception {
        if (null == client.checkExists().forPath(path)) return;
        client.delete().forPath(path);
    }

    /**
     * 创建节点
     *
     * @param client zk 客户端
     * @param path   节点
     */
    public static void create(CuratorFramework client, String path) throws Exception {
        List<String> pathChild = new ArrayList<>();
        pathChild.add(path);
        while (path.lastIndexOf(Constants.Global.LINE) > 0) {
            path = path.substring(0, path.lastIndexOf(Constants.Global.LINE));
            pathChild.add(path);
        }
        for (int i = pathChild.size() - 1; i >= 0; i--) {
            Stat stat = client.checkExists().forPath(pathChild.get(i));
            if (null == stat) {
                client.create().creatingParentsIfNeeded().forPath(pathChild.get(i));
            }
        }
    }

    /**
     * 创建节点
     *
     * @param client zk 客户端
     * @param path   节点
     */
    public static void createNodeSimple(CuratorFramework client, String path) throws Exception {
        if (null == client.checkExists().forPath(path)) {
            client.create().creatingParentsIfNeeded().forPath(path);
        }
    }

    /**
     * 为节点设置数据
     *
     * @param client zk 客户端
     * @param path   节点
     * @param data   数据
     */
    public static void setData(CuratorFramework client, String path, String data) throws Exception {
        if (null == client.checkExists().forPath(path)) return;
        client.setData().forPath(path, data.getBytes(Constants.Global.CHARSET));
    }

    /**
     * 监听节点
     * 对所有子节点进行监听
     *
     * @param client zk 客户端
     * @param path   节点
     */
    @SuppressWarnings("unchecked")
    public static <T> void addTreeCacheListener(CuratorFramework client, String path, ScheduleEventListener<T> eventListener) throws Exception {
        TreeCache treeCache = new TreeCache(client, path);
        treeCache.start();

        treeCache.getListenable().addListener((curatorFramework, event) -> {
            // 解析事件
            if (null == event.getData()) {
                return;
            }
            // 处理事件
            eventListener.consumeEvent((T) event);
        });
    }

    /**
     * 监听节点
     * 对指定子节点进行监听
     *
     * @param client zk 客户端
     * @param path   节点
     */
    @SuppressWarnings("unchecked")
    public static <T> void addNodeCacheListener(CuratorFramework client, String path, ScheduleEventListener<T> eventListener) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        pathChildrenCache.getListenable().addListener((curatorFramework, event) -> {
            if (null == event.getData()) {
                return;
            }

            eventListener.consumeEvent((T) event);
        });
    }

    /**
     * 创建临时节点
     * @param client
     * @param path
     * @param value
     * @throws Exception
     */
    public static void createEphemeralNode(CuratorFramework client, String path, String value) throws Exception {
        client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath(path, value.getBytes(Constants.Global.CHARSET));
    }

    /**
     * 创建临时节点
     * PersistentEphemeralNode 是 ZooKeeper 中的一种节点类型，它是一种特殊的临时节点。
     * 与普通的临时节点不同，PersistentEphemeralNode 在节点被删除之后会自动重新创建。
     *通常情况下，当一个客户端与 ZooKeeper 服务器断开连接时，它创建的临时节点会被删除。
     * 但是，使用 PersistentEphemeralNode，即使客户端断开连接，节点也会被保留，并在客户端重新连接时重新创建。
     *这种节点类型在某些场景下非常有用，例如在分布式系统中，当一个节点宕机或断开连接时，
     * 其他节点可以通过监听 PersistentEphemeralNode 的创建和删除事件来感知节点的状态变化。
     * @param client zk 客户端
     * @param path   节点
     * @param data   数据
     */
    public static void appendPersistentData(CuratorFramework client, String path, String data) throws Exception {
        if (null != client.checkExists().forPath(path)) {
            client.delete().deletingChildrenIfNeeded().forPath(path);
        }
        PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, path, data.getBytes(Constants.Global.CHARSET));
        node.start();
        node.waitForInitialCreate(0, TimeUnit.SECONDS);
    }


    public static InterProcessMutex getLock(CuratorFramework client, String path) {
        return new InterProcessMutex(client, path);
    }

    public static boolean isExisted(CuratorFramework client, String path) {
        try {
            return null != client.checkExists().forPath(path);
        } catch (Exception e) {
            return false;
        }
    }

    public static String getData(CuratorFramework client, String path) throws Exception {
        if (null == client.checkExists().forPath(path)) {
            return null;
        }
        byte[] data = client.getData().forPath(path);
        if (null == data) {
            return null;
        }
        return new String(data, Constants.Global.CHARSET);
    }
}
